-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Telemetry (#905) #918
base: master
Are you sure you want to change the base?
Telemetry (#905) #918
Conversation
using :telemetry.span
@tag :telemetry | ||
|
||
setup %{links: links} do | ||
ref = make_ref() | ||
|
||
events = | ||
for event <- @events, | ||
step <- @steps do | ||
[:membrane, :element, event, step] | ||
end | ||
|
||
:telemetry.attach_many(ref, events, &TelemetryListener.handle_event/4, %{ | ||
dest: self(), | ||
ref: ref | ||
}) | ||
|
||
pid = Testing.Pipeline.start_link_supervised!(spec: links) | ||
:ok = Testing.Pipeline.terminate(pid) | ||
[ref: ref] | ||
end | ||
|
||
# Test each lifecycle step for each element type | ||
for element_type <- @paths, | ||
event <- @events do | ||
test "#{element_type}/#{event}", %{ref: ref} do | ||
element_type = unquote(element_type) | ||
event = unquote(event) | ||
|
||
assert_receive {^ref, :telemetry_ack, | ||
{[:membrane, :element, ^event, :start], meta, %{path: [_, ^element_type]}}} | ||
|
||
assert meta.system_time > 0 | ||
|
||
assert_receive {^ref, :telemetry_ack, | ||
{[:membrane, :element, ^event, :stop], meta, %{path: [_, ^element_type]}}} | ||
|
||
assert meta.duration > 0 | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tag :telemetry
will have impact only on the first test
defined after it, so it should be moved inside for
statement
assert_receive {^ref, :telemetry_ack, | ||
{[:membrane, :element, ^event, :stop], meta, %{path: [_, ^element_type]}}} | ||
|
||
assert meta.duration > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the execution of the callback is really fast, the duration
can be 0 and it won't be a bug. I would rather check if it is not a negative number.
assert_receive {^ref, :telemetry_ack, | ||
{[:membrane, :element, ^event, :start], meta, %{path: [_, ^element_type]}}} | ||
|
||
assert meta.system_time > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we want to have system_time
in a metadata, maybe monotonic_time
would be better?
links = [ | ||
child(:source, %Testing.Source{output: [~c"a", ~c"b", ~c"c"]}) | ||
|> child(:filter, %TestFilter{target: self()}) | ||
|> child(:sink, Testing.Sink) | ||
] | ||
|
||
[links: links, ref: ref] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend you to rename links
to spec
in this file, this will be more consistent with the naming convention used in the other places in membrane
def_input_pad :input, flow_control: :manual, accepted_format: _any, demand_unit: :buffers | ||
def_output_pad :output, flow_control: :manual, accepted_format: _any | ||
def_options target: [spec: pid()] | ||
|
||
@impl true | ||
def handle_demand(_pad, size, _unit, _context, state) do | ||
{[demand: {:input, size}], state} | ||
end | ||
|
||
@impl true | ||
def handle_buffer(_pad, _buffer, _context, state), do: {[], state} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove flow_control: :manual
option (so it will be set to the default :auto
) and remove handle_demand
implementation - If you do so the element should be as useful as it is right now
Telemetry.report_span :element, :terminate do | ||
apply(module, callback, args) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This apply
executes all custom callbacks implemented in the membrane components, not only terminate (but handle_terminate
is executed here as well as the rest of the handle_*
callbacks) and not only in Elements, but also in Bins and Pipelines. Take a look at the path to the file or module name: there is no element
in them, so it suggests that this code is used not only in elements.
Telemetry.report_span :element, :terminate do | |
apply(module, callback, args) | |
end | |
component_type = | |
case state do | |
%Membrane.Core.Element.State{} -> :element | |
# the same for bin and pipeline | |
end | |
Telemetry.report_span component_type, callback do | |
apply(module, callback, args) | |
end |
Doing it this way will emit event for ALL callbacks and I think this is the thing we want to do
Telemetry.report_span :bin, :init do | ||
do_init(options) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we will have an appropriate span for handle_init
in CallbackHandler
, there is no need for span for :init
here
I took a brief look at your changes and I commented things that caught my attention |
Relates to #905